/* Copyright (c) 2022 渝州大数据实验室
 *
 * Lanius is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *     http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */
package org.yzbdl.lanius.orchestrate.serv.quartz.handler.category.impl;

import cn.hutool.core.lang.TypeReference;
import cn.hutool.core.util.URLUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.xerces.impl.dv.util.Base64;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cglib.beans.BeanMap;
import org.springframework.stereotype.Component;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.ResourceAccessException;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.yzbdl.lanius.orchestrate.serv.constant.*;
import org.yzbdl.lanius.orchestrate.serv.constant.schedule.kettle.KettleConstant;
import org.yzbdl.lanius.orchestrate.serv.constant.schedule.kettle.KettleXmlConstant;
import org.yzbdl.lanius.orchestrate.serv.dto.resource.ServerProgramInfoDTO;
import org.yzbdl.lanius.orchestrate.serv.dto.task.TaskInstanceResourceDTO;
import org.yzbdl.lanius.orchestrate.serv.dto.task.TaskPlanResourceDTO;
import org.yzbdl.lanius.orchestrate.serv.entity.resource.ServerEntity;
import org.yzbdl.lanius.orchestrate.serv.entity.resource.TaskResourceConfigEntity;
import org.yzbdl.lanius.orchestrate.serv.entity.resource.TaskResourceEntity;
import org.yzbdl.lanius.orchestrate.serv.entity.task.TaskInstance;
import org.yzbdl.lanius.orchestrate.serv.entity.task.TaskStepLog;
import org.yzbdl.lanius.orchestrate.serv.enums.*;
import org.yzbdl.lanius.orchestrate.serv.quartz.handler.category.ProgramCategoryExecutorFactory;
import org.yzbdl.lanius.orchestrate.serv.quartz.handler.category.ProgramCategoryExecutorInvokerHandler;
import org.yzbdl.lanius.orchestrate.serv.quartz.handler.resourcetype.ResourceTypeFactory;
import org.yzbdl.lanius.orchestrate.serv.quartz.handler.resourcetype.ResourceTypeInvokerHandler;
import org.yzbdl.lanius.orchestrate.serv.service.resource.ServerProgramService;
import org.yzbdl.lanius.orchestrate.serv.service.task.TaskInstanceService;
import org.yzbdl.lanius.orchestrate.serv.service.task.TaskStepLogService;
import org.yzbdl.lanius.orchestrate.serv.utils.*;
import org.yzbdl.lanius.orchestrate.serv.vo.task.TaskStepLogVO;
import org.yzbdl.lanius.orchestrate.serv.vo.task.schedule.TaskMonitorThreadVO;
import org.yzbdl.lanius.orchestrate.common.exception.runtime.BusinessException;
import org.yzbdl.lanius.orchestrate.common.utils.HttpPostUtils;
import org.yzbdl.lanius.orchestrate.common.utils.MessageUtil;
import org.yzbdl.lanius.orchestrate.common.xml.XMLHandler;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.ConnectException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.text.DecimalFormat;
import java.text.ParseException;
import java.time.LocalDate;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;

/**
 * Kettle平台执行器
 *
 * @author jinchunzhao@yzbdl.ac.cn
 * @date 2022-04-20 9:17
 */
@Component
@Slf4j
public class KettleExecutorHandler implements ProgramCategoryExecutorInvokerHandler {

    @Autowired
    private ThreadPoolExecutor executor;

    @Autowired
    private TaskInstanceService taskInstanceService;

    @Autowired
    private TaskStepLogService taskStepLogService;

    @Autowired
    private ServerProgramService serverProgramService;

    /**
     * Token前缀
     */
    private static final String BASIC = "Basic ";

    /**
     * 响应结果：Server Error
     */
    private static final String RES_SERVER_ERROR = "Server Error";

    /**
     * 响应结果：ERROR
     */
    private static final String RES_ERROR = "ERROR";

    /**
     * 响应结果：ERROR
     */
    private static final String RES_XML_ERROR = "<result>ERROR</result>";

    /**
     * carteObjectId参数
     */
    private static final String URL_PARAM_CARTE_OBJECT_ID = "&carteObjectId=";

    /**
     * rep参数
     */
    private static final String URL_PARAM_REP = "&rep=";

    /**
     * user参数
     */
    private static final String URL_PARAM_USER = "&user=";

    /**
     * pass参数
     */
    private static final String URL_PARAM_PASS = "&pass=";

    /**
     * 连接符
     */
    private static final String CONN_SLASH = "/";

    /**
     * =
     */
    private static final String URL_EQUAL_SIGN = "=";

    @Override
    public void executorTaskPlanJob(TaskInstance taskInstance, TaskPlanResourceDTO taskPlanResourceDTO,
                                    ServerProgramInfoDTO serverProgram, Boolean isIncrementalLog) {
        String baseUrl = StringUtils.EMPTY;
        String execUrlBackup = StringUtils.EMPTY;
        Long taskInstanceId = taskInstance.getId();
        Long taskPlanId = taskPlanResourceDTO.getId();
        Long orgId = taskPlanResourceDTO.getOrgId();
        Long groupId = taskPlanResourceDTO.getGroupId();
        String currentDate = DateUtil.format(new Date(), DateUtil.DATE_FORMAT_SECOND);
        try {
            // 基础路径
            baseUrl = getBaseUrl(serverProgram);
            // 获取请求参数
            List<String> requestParams = getRequestParam(taskPlanResourceDTO.getTaskResourceEntity().getResourceType());
            // 获取请求url
            String execUrl = getExecuteUrl(baseUrl, requestParams, taskPlanResourceDTO.getLogLevel());
            String taskInfo = taskInstance.getTaskInfo();
            // 获取备份url,以作参考使用
            execUrlBackup =
                    getExecUrlBackup(execUrl, taskInfo, taskPlanResourceDTO.getTaskResourceEntity(), requestParams);
            MultiValueMap<String, Object> requestBody = getRequestBody(taskPlanResourceDTO, requestParams, taskInfo);
            Map<String, String> headers = getRequestHeaders(taskPlanResourceDTO, serverProgram.getAuthConfig());
            String result = StringUtils.EMPTY;
            String resultStr = StringUtils.EMPTY;
            String msg;
            try {
                result = HttpPostUtils.getInstance().postWithFormData(execUrl, headers, requestBody);
                // 3. 失败
                if (Objects.equals(result, RES_SERVER_ERROR) || result.contains(RES_XML_ERROR)) {
                    msg = String.format("%s - 任务执行调度失败，服务器错误！计划名称：%s！返回信息：\r\n %s", currentDate,
                            taskPlanResourceDTO.getTaskName(), result);
                    log.error(msg);
                    TaskInstanceLogUtil.writerLog(orgId, groupId, taskPlanId, taskInstanceId, msg);
                    resultStr = RES_SERVER_ERROR;
                } else {
                    // 3. 请求成功
                    msg = String.format("%s - 任务调度指令发送成功！\r\n", currentDate);
                    TaskInstanceLogUtil.writerLog(orgId, groupId, taskPlanId, taskInstanceId, msg);
                }
            } catch (IllegalArgumentException e) {
                resultStr = RES_ERROR;
                result = e.getMessage();
            } catch (ResourceAccessException e) {
                // 服务超时，但服务节点正在执行，所以此处检查是否连接成功
                resultStr = spinCheckConn(e, baseUrl, taskInfo, taskPlanResourceDTO, serverProgram, taskInstanceId);
            }
            // 3. 失败（结束）
            if (Objects.equals(resultStr, RES_ERROR) || Objects.equals(resultStr, RES_SERVER_ERROR)) {
                log.error("服务节点执行此任务失败[{}}]:{} \r\n以下消息来自服务节点\r\n {}", resultStr, execUrl, result);
                updateTaskInstanceStatus(taskInstanceId, TaskInstanceStatusEnum.FAIL.getCode(), execUrlBackup);
            } else {
                // 4. 初始化中
                updateTaskInstanceStatus(taskInstanceId, TaskInstanceStatusEnum.IN_SERVICE.getCode(), execUrlBackup);
                // 5. 进入监控流程。 开启线程监控
                taskMonitorThread(taskPlanResourceDTO, baseUrl, taskInstance, serverProgram, currentDate,
                        isIncrementalLog);
            }
        } catch (Exception e) {
            // 7. 请求超时：初始化失败
            timeOutHandlerProcess(e, currentDate, baseUrl, execUrlBackup, taskPlanResourceDTO, taskInstanceId);
        }
    }

    @Override
    public String transImage(Long taskInstanceId, Long serverProgramId) {

        TaskInstanceResourceDTO taskInstanceResourceDTO = taskInstanceService.getTaskInstanceResourceIgnoreTenantId(taskInstanceId);
        if (Objects.isNull(taskInstanceResourceDTO)) {
            throw new BusinessException(MessageUtil.get("task.instance.image_not_found"));
        }
        // 获取entranceName
        TaskResourceEntity taskResourceEntity = taskInstanceResourceDTO.getTaskResourceEntity();
        if (Objects.isNull(taskResourceEntity)) {
            throw new BusinessException(MessageUtil.get("task.instance.image_not_found"));
        }

        // 获取图片请求地址
        Integer resourceType = taskInstanceResourceDTO.getTaskResourceEntity().getResourceType();
        ResourceTypeInvokerHandler handler = ResourceTypeFactory.getHandler(resourceType.toString());
        if (Objects.isNull(handler)) {
            throw new BusinessException(MessageUtil.get("task.instance.image_not_found"));
        }

        // 获取基础路径
        ServerProgramInfoDTO serverProgram =
                serverProgramService.getServerProgramInfoByIdIgnoreTenantId(serverProgramId);
        if (Objects.isNull(serverProgram) || Objects.isNull(serverProgram.getServerEntity())) {
            throw new BusinessException(MessageUtil.get("task.instance.image_not_found"));
        }
        String value = ResourceTypeEnum.matchValue(resourceType);
        if (StringUtils.isBlank(value)) {
            throw new BusinessException(MessageUtil.get("task.instance.image_not_found"));
        }

        String entrance = getEntrance(taskResourceEntity);
        String entranceName = getEntranceFullName(entrance);
        String baseUrl = getBaseUrl(serverProgram);
        String carteObjectId = getCarteObjectId(taskInstanceResourceDTO.getTaskInfo());
        StringBuilder urlBuilder = new StringBuilder(baseUrl);
        urlBuilder.append(handler.getImageUrL()).append(KettleConstant.QUERY_URL_PARAM_NAME).append(URLUtil.encode(entranceName));
        if (Objects.nonNull(carteObjectId)) {
            urlBuilder.append(KettleConstant.QUERY_URL_PARAM_CARTE_OBJECT_ID).append(carteObjectId);
        }
        urlBuilder.append(KettleConstant.QUERY_URL_PARAM_KTR_TYPE).append(value);
        Map<String, Object> resourceContentMap = resourceContentToMap(taskResourceEntity);

        // 组装资源库信息参数
        Map<String, Object> headers = getImgReader(resourceContentMap, serverProgram.getAuthConfig(), taskInstanceResourceDTO);
        return HttpUtil.getImageBase64FromResponse(urlBuilder.toString(), headers);
    }


    @Override
    public void afterPropertiesSet() {
        ProgramCategoryExecutorFactory.register(ProgramCategoryEnum.KETTLE.getCode().toString(), this);
    }

    /**
     * 拼装完整执行url保存到执行记录做参考使用
     * 此处拼接的为资源库的账号、密码在lo_task_resource表字段resource_content中查找
     *
     * @param execUrl            路径
     * @param taskInfo           实例JSON信息
     * @param taskResourceEntity 任务资源实体
     * @param requestParams      请求参数
     * @return 备份路径
     */
    private String getExecUrlBackup(String execUrl, String taskInfo, TaskResourceEntity taskResourceEntity,
                                    List<String> requestParams) {

        StringBuilder sb = new StringBuilder(execUrl);
        if (Objects.isNull(taskResourceEntity)) {
            return execUrl;
        }
        String carteObjectId = getCarteObjectId(taskInfo);
        sb.append(URL_PARAM_CARTE_OBJECT_ID).append(carteObjectId);

        Map<String, Object> resourceContentMap = resourceContentToMap(taskResourceEntity);
        if (MapUtils.isEmpty(resourceContentMap)) {
            return sb.toString();
        }
        Object repoNameObj = resourceContentMap.get(ResourceContentConstant.KETTLE_REPO_NAME);
        if (Objects.nonNull(repoNameObj)) {
            sb.append(URL_PARAM_REP).append(repoNameObj);
        }
        Object repoAccountObj = resourceContentMap.get(ResourceContentConstant.KETTLE_REPO_ACCOUNT);
        if (Objects.nonNull(repoAccountObj)) {
            sb.append(URL_PARAM_USER).append(repoAccountObj);
        }
        Object repoPasswordObj = resourceContentMap.get(ResourceContentConstant.KETTLE_REPO_ACCOUNT);
        if (Objects.nonNull(repoPasswordObj)) {
            sb.append(URL_PARAM_PASS).append(repoPasswordObj);
        }

        String entrance = getEntrance(taskResourceEntity);
        if (StringUtils.isNotBlank(entrance)) {
            sb.append("&").append(requestParams.get(1)).append(URL_EQUAL_SIGN).append(getEntranceFullName(entrance));
        }

        return sb.toString();

    }

    /**
     * 获取资源Entrance
     *
     * @param taskResourceEntity 任务资源实体
     * @return Entrance
     */
    private String getEntrance(TaskResourceEntity taskResourceEntity) {
        String entrance = StringUtils.EMPTY;

        Map<String, Object> map = resourceContentToMap(taskResourceEntity);
        if (MapUtils.isEmpty(map)) {
            return entrance;
        }
        Object entranceObj = map.get(ResourceContentConstant.KETTLE_REPO_ENTRANCE);
        if (Objects.isNull(entranceObj)) {
            return entrance;
        }
        entrance = entranceObj.toString();
        return entrance;
    }

    /**
     * 获取全路径名称
     *
     * @param entrance entrance
     * @return 名称
     */
    private String getEntranceFullName(String entrance) {
        return entrance.substring(0, entrance.lastIndexOf("."));
    }

    /**
     * 获取全名称
     *
     * @param entranceFullName 全路径
     * @return 名称
     */
    private String getEntranceName(String entranceFullName) {
        return entranceFullName.substring(entranceFullName.lastIndexOf(CONN_SLASH) + 1);
    }

    /**
     * 任务资源配置内容json转Map
     *
     * @param taskResourceEntity 任务资源实体信息
     * @return 结果数据
     */
    private Map<String, Object> resourceContentToMap(TaskResourceEntity taskResourceEntity) {
        Map<String, Object> resourceContentMap = new HashMap<>();
        if (Objects.isNull(taskResourceEntity)) {
            return resourceContentMap;
        }

        String resourceContent = taskResourceEntity.getResourceContent();
        if (StringUtils.isBlank(resourceContent)) {
            return resourceContentMap;
        }
        return JSONUtil.toBean(JSONUtil.parse(resourceContent), new TypeReference<>(){},true);
    }

    /**
     * 获取执行URL
     *
     * @param baseUrl       基础请求地址
     * @param requestParams 请求参数
     * @param logLevel      日志级别
     * @return url
     */
    private String getExecuteUrl(String baseUrl, List<String> requestParams, Integer logLevel) {
        StringBuilder baseUrlBuilder = new StringBuilder(baseUrl);
        baseUrlBuilder.append(requestParams.get(0));
        baseUrlBuilder.append(KettleConstant.KETTLE_EXEC_URL_LOG_PARAM)
                .append(PentahoLogLevelEnum.getPentahoLogLevelCode(logLevel));
        return baseUrlBuilder.toString();

    }

    /**
     * 获取查询url
     *
     * @param baseUrl        基础url
     * @param checkStatusUri 检查状态uri
     * @param entranceName   etl名称
     * @param taskInfo       任务实例配置信息
     * @return 查询url
     */
    private String getQueryUrl(String baseUrl, String checkStatusUri, String entranceName, String taskInfo,
                               Long LogLineNumber) {
        StringBuilder sb = new StringBuilder(baseUrl);
        sb.append(checkStatusUri).append(KettleConstant.QUERY_URL_PARAM_NAME).append(entranceName);

        String carteObjectId = getCarteObjectId(taskInfo);
        if (Objects.nonNull(carteObjectId)) {
            sb.append(KettleConstant.QUERY_URL_PARAM_CARTE_OBJECT_ID).append(carteObjectId);
        }
        sb.append(KettleConstant.QUERY_URL_PARAM_XML);
        if (Objects.nonNull(LogLineNumber)) {
            sb.append(KettleConstant.QUERY_URL_PARAM_LOG_NUMBER).append(LogLineNumber);
        }

        return sb.toString();

    }

    /**
     * 获取请求参数
     *
     * @param resourceType 资源类型
     * @return 参数
     */
    private List<String> getRequestParam(Integer resourceType) {
        ResourceTypeInvokerHandler handler = ResourceTypeFactory.getHandler(resourceType.toString());
        if (Objects.isNull(handler)) {
            log.error("未找到对应的资源类型处理器,resourceType:{}", resourceType);
            throw new BusinessException("未找到对应的资源类型处理器");
        }
        return handler.getRequestParam();
    }

    /**
     * 获取检查网址
     *
     * @param resourceType 资源类型
     * @return 检查网址
     */
    private String getCheckStatusUrl(Integer resourceType) {
        ResourceTypeInvokerHandler handler = ResourceTypeFactory.getHandler(resourceType.toString());
        if (Objects.isNull(handler)) {
            log.error("未找到对应的资源类型处理器,resourceType:{}", resourceType);
            throw new BusinessException("未找到对应的资源类型处理器");
        }
        return handler.getCheckStatusUrl();
    }

    /**
     * 获取基础路径
     *
     * @param serverProgram 服务节点信息
     * @return 基础路径
     */
    private String getBaseUrl(ServerProgramInfoDTO serverProgram) {
        ServerEntity serverEntity = serverProgram.getServerEntity();
        String serverIp = serverEntity.getServerIp();
        Integer serverPort = serverProgram.getProgramPort();
        StringBuffer baseUrlBuilder = new StringBuffer(TaskPlanSchedulerConstant.SCHEDULER_BASE_URL_PREFIX);
        baseUrlBuilder.append(serverIp).append(":").append(serverPort).append(KettleConstant.KETTLE_BASE_URL_API_PRE_PREFIX);
        return baseUrlBuilder.toString();
    }

    /**
     * 构建请求头，包含权限校验及资源库初始化配置
     */
    private Map<String, String> getRequestHeaders(TaskPlanResourceDTO taskPlanResourceDTO, String authConfig) {
        Map<String, String> headersMap = new HashMap<>(9);
        Map<String, String> repConfigJsonMap = new HashMap<>(12);
        TaskResourceEntity taskResourceEntity = taskPlanResourceDTO.getTaskResourceEntity();
        Map<String, Object> resourceContentMap = resourceContentToMap(taskResourceEntity);

        // 资源库名称
        repConfigJsonMap.put(EtlRequestConstant.HEADER_REPO_NAME,
                Objects.isNull(resourceContentMap.get(ResourceContentConstant.KETTLE_REPO_NAME)) ? ""
                        : resourceContentMap.get(ResourceContentConstant.KETTLE_REPO_NAME).toString());
        // 资源库描述
        repConfigJsonMap.put(EtlRequestConstant.HEADER_REPO_DESC,
                Objects.isNull(resourceContentMap.get(ResourceContentConstant.KETTLE_REPO_DESC)) ? ""
                        : resourceContentMap.get(ResourceContentConstant.KETTLE_REPO_DESC).toString());

        TaskResourceConfigEntity resourceConfigEntity = taskPlanResourceDTO.getTaskResourceConfigEntity();
        // 配置数据库连接信息
        getRepHeadersBuildDb(repConfigJsonMap, resourceConfigEntity);

        headersMap.put(EtlRequestConstant.HEADER_REPO_CONFIG_JSON, URLUtil.encode(JSONUtil.toJsonStr(repConfigJsonMap)));

        // 封装请求认证
        if (StringUtils.isNotBlank(authConfig)) {
            Map<String, String> map =
                    strToMap(authConfig);

            String userName = map.get(ServerNodeAuthConfigConstant.USERNAME);
            String password = map.get(ServerNodeAuthConfigConstant.PASSWORD);
            headersMap.put(EtlRequestConstant.HEADER_AUTH, getAuthorization(userName, password));
        }
        return headersMap;
    }

    /**
     * 构建请求体
     *
     * @param taskPlanResourceDTO 任务及资源信息
     * @param requestParams       请求参数
     * @param taskInfo            任务实例json配置
     * @return 结果信息
     */
    private MultiValueMap<String, Object> getRequestBody(TaskPlanResourceDTO taskPlanResourceDTO,
                                                         List<String> requestParams, String taskInfo) {
        // 设置请求体参数
        MultiValueMap<String, Object> postParameters = new LinkedMultiValueMap<>();
        // 组装任务编排之任务配置
        getRequestBodyBuildTaskPlan(postParameters, taskPlanResourceDTO.getJsonConfig());
        // 组装carteObjectId唯一标识参数
        if (StringUtils.isNotBlank(taskInfo)) {
            postParameters.add(EtlRequestConstant.CARTE_OBJ_ID, getCarteObjectId(taskInfo));
        }
        TaskResourceEntity taskResourceEntity = taskPlanResourceDTO.getTaskResourceEntity();
        if (Objects.isNull(taskResourceEntity)) {
            return postParameters;
        }
        String entrance = getEntrance(taskResourceEntity);
        if (StringUtils.isNotBlank(entrance)) {
            String entranceFullName = getEntranceFullName(entrance);
            if (StringUtils.isNotBlank(entranceFullName)) {
                postParameters.add(requestParams.get(1), entranceFullName);
            }
        }
        Map<String, Object> resourceContentMap = resourceContentToMap(taskResourceEntity);
        if (MapUtils.isEmpty(resourceContentMap)) {
            return postParameters;
        }
        // 组装资源库信息参数
        getRequestBodyBuildResourceContent(postParameters, resourceContentMap);
        return postParameters;
    }

    /**
     * 配置数据库连接信息
     *
     * @param repConfigJsonMap     参数Map
     * @param resourceConfigEntity 任务资源配置实体
     */
    private void getRepHeadersBuildDb(Map<String, String> repConfigJsonMap,
                                      TaskResourceConfigEntity resourceConfigEntity) {
        // 数据库资源库类型
        repConfigJsonMap.put(EtlRequestConstant.HEADER_REPO_ID, "KettleDatabaseRepository");
        // 连接名
        repConfigJsonMap.put(EtlRequestConstant.HEADER_DB_CONN_NAME, resourceConfigEntity.getConnectName());
        // 地址
        String connectUrl = resourceConfigEntity.getConnectUrl();
        URI uri = JdbcUtil.parseJdbcUrl(connectUrl);
        // ip
        repConfigJsonMap.put(EtlRequestConstant.HEADER_DB_CONN_HOST, uri.getHost());

        String dataBaseTypeName = CommonUtil.getDataBaseTypeNameByType(resourceConfigEntity.getConnectCategory());
        // 旧版数据库类型 保留参数兼容CKettle1.0
        repConfigJsonMap.put(EtlRequestConstant.HEADER_DB_TYPE, dataBaseTypeName.toUpperCase() + "/1.0.0");
        // 数据库名
        repConfigJsonMap.put(EtlRequestConstant.HEADER_DB_NAME, JdbcUtil.getDbName(uri));
        // 端口
        repConfigJsonMap.put(EtlRequestConstant.HEADER_DB_PORT, String.valueOf(uri.getPort()));
        // 账号
        repConfigJsonMap.put(EtlRequestConstant.HEADER_DB_USER, resourceConfigEntity.getConnectAccount());
        // 密码
        repConfigJsonMap.put(EtlRequestConstant.HEADER_DB_PASS, resourceConfigEntity.getConnectPassword());
        // url 写法是jdbc:mysql://127.0.0.1:3306/db_rep
        if (StringUtils.isNotBlank(uri.getQuery())) {
            // 资源库用户名
            repConfigJsonMap.put(EtlRequestConstant.HEADER_DB_EO, uri.getQuery());
        }
    }

    /**
     * 组装任务编排之任务配置
     *
     * @param postParameters 参数收集
     * @param jsonConfig     任务编排配置参数
     */
    private void getRequestBodyBuildTaskPlan(MultiValueMap<String, Object> postParameters, String jsonConfig) {
        if (StringUtils.isBlank(jsonConfig)) {
            return;
        }

        JSONArray jsonArray = JSONUtil.parseArray(jsonConfig);
        for (Object obj : jsonArray) {
            if (Objects.isNull(obj)) {
                continue;
            }
            if (obj instanceof Map) {
                JSONObject jsonObject = JSONUtil.parseObj(obj);
                String paramType = jsonObject.getStr(TaskJsonConfigConstant.PARAM_TYPE);
                String paramValue = TaskJsonConfigConstant.PARAM_VALUE;
                String paramName = TaskJsonConfigConstant.PARAM_NAME;
                if (Objects.equals(paramType, DataTypeEnum.STRING.getCode())) {
                    postParameters.add(jsonObject.getStr(paramName), jsonObject.getStr(paramValue));
                } else if (Objects.equals(paramType, DataTypeEnum.NUMBER.getCode())) {
                    postParameters.add(jsonObject.getStr(paramName), jsonObject.get(paramValue));
                } else if (Objects.equals(paramType, DataTypeEnum.JSON_OBJECT.getCode())) {
                    postParameters.add(jsonObject.getStr(paramName), jsonObject.getJSONObject(paramValue));
                } else if (Objects.equals(paramType, DataTypeEnum.ARR.getCode())) {
                    postParameters.add(jsonObject.getStr(paramName), jsonObject.getJSONArray(paramValue));
                } else if (Objects.equals(paramType, DataTypeEnum.BOOLEAN.getCode())) {
                    postParameters.add(jsonObject.getStr(paramName), jsonObject.getBool(paramValue));
                } else {
                    postParameters.add(jsonObject.getStr(paramName), jsonObject.get(paramValue));
                }
            }
        }
    }

    /**
     * 组装资源库信息参数
     *
     * @param postParameters     body参数收集
     * @param resourceContentMap 资源库信息参数
     */
    private void getRequestBodyBuildResourceContent(MultiValueMap<String, Object> postParameters,
                                                    Map<String, Object> resourceContentMap) {
        // 组装资源库信息参数
        Object repoNameObj = resourceContentMap.get(ResourceContentConstant.KETTLE_REPO_NAME);
        if (Objects.nonNull(repoNameObj)) {
            // kettle连接资源库的名称（不能乱写）
            postParameters.add(EtlRequestConstant.BODY_REPO_NAME, repoNameObj);
        }
        Object repoAccountObj = resourceContentMap.get(ResourceContentConstant.KETTLE_REPO_ACCOUNT);
        if (Objects.nonNull(repoAccountObj)) {
            postParameters.add(EtlRequestConstant.BODY_REPO_USER, repoAccountObj);
        }

        Object repoPasswordObj = resourceContentMap.get(ResourceContentConstant.KETTLE_REPO_PASSWORD);
        if (Objects.nonNull(repoPasswordObj)) {
            postParameters.add(EtlRequestConstant.BODY_REPO_PASS, repoPasswordObj);
        }
    }

    /**
     * 获取carteObjectId
     *
     * @param taskInfo 任务实例Json配置
     * @return carteObjectId
     */
    private String getCarteObjectId(String taskInfo) {
        Map<String, String> map = strToMap(taskInfo);
        return map.get(EtlRequestConstant.CARTE_OBJ_ID);
    }

    /**
     * 获取服务节点执行任务结果文档对象
     *
     * @param taskResourceEntity 任务资源实体
     * @param baseUrl            基础url
     * @param taskInfo           任务实例参数配置
     * @param authConfig         服务节点权限配置
     * @return 文档对象
     * @throws Exception 任何异常
     */
    private Document getKettleDocument(TaskResourceEntity taskResourceEntity, String baseUrl, String taskInfo,
                                       String authConfig, Long LogLineNumber) throws Exception {
        String result = executorFetchKettleXmlRequest(taskResourceEntity, baseUrl, taskInfo, authConfig, LogLineNumber);
        log.error("----------------------转换执行结果返回xml:{}", result);
        if (StringUtils.isNotBlank(result)) {
            if (result.equals(RES_SERVER_ERROR)) {
                log.error("获取任务执行结果，服务器错误！");
                return null;
            }
            return XMLHandler.loadXmlString(result);
        }
        return null;
    }

    /**
     * 获取Kettle执行结果的XML数据
     *
     * @param taskResourceEntity 任务资源实体
     * @param baseUrl            基础url
     * @param taskInfo           任务实例参数配置
     * @param authConfig         服务节点权限配置
     * @return XML数据
     */
    private String executorFetchKettleXmlRequest(TaskResourceEntity taskResourceEntity, String baseUrl, String taskInfo,
                                                 String authConfig, Long LogLineNumber) {
        String entrance = getEntrance(taskResourceEntity);
        String checkStatusUri = getCheckStatusUrl(taskResourceEntity.getResourceType());
        String entranceFullName = getEntranceFullName(entrance);
        String entranceName = getEntranceName(entranceFullName);

        String queryUrl = getQueryUrl(baseUrl, checkStatusUri, entranceName, taskInfo, LogLineNumber);

        if (StringUtils.isBlank(authConfig)) {
            return null;
        }
        Map<String, String> map = strToMap(authConfig);
        String userName = map.get(ServerNodeAuthConfigConstant.USERNAME);
        String password = map.get(ServerNodeAuthConfigConstant.PASSWORD);
        return HttpPostUtils.getInstance().post(queryUrl, getAuthorization(userName, password));
    }

    /**
     * 更新任务实例状态
     *
     * @param taskInstanceId 任务实例ID
     * @param status         状态编码
     * @param execUrlBackup  备份url做参考使用
     */
    private void updateTaskInstanceStatus(Long taskInstanceId, Integer status, String execUrlBackup) {
        List<Integer> runStateList = CommonUtil.getRunTaskInstanceStatus();
        boolean taskEnd = !runStateList.contains(status);

        TaskInstance taskInstance = new TaskInstance();
        taskInstance.setId(taskInstanceId);
        taskInstance.setStatus(status);
        // 结束设置结束时间
        if (taskEnd) {
            taskInstance.setEndTime(new Date());
        }
        if (StringUtils.isNotBlank(execUrlBackup)) {
            TaskInstance sysTaskInstance = taskInstanceService.getByIdIgnoreTenantId(taskInstanceId);
            String taskInfo = sysTaskInstance.getTaskInfo();
            Map<String, String> map;
            if (StringUtils.isBlank(taskInfo)) {
                map = new HashMap<>();
            } else {
                map = strToMap(taskInfo);
            }
            map.put("execUrlBackup", execUrlBackup);
            taskInstance.setTaskInfo(JSONUtil.toJsonStr(map));
        }

        taskInstanceService.updateByIdIgnoreTenantId(taskInstance);
    }

    /**
     * 任务监控线程
     *
     * @param taskPlanResourceDTO 任务编排信息
     * @param baseUrl             基础url
     * @param taskInstance        任务实例
     * @param serverProgram       服务节点
     * @param currentDate         当前时间
     * @param isIncrementalLog    是否是增量日志
     */
    private void taskMonitorThread(TaskPlanResourceDTO taskPlanResourceDTO, String baseUrl, TaskInstance taskInstance,
                                   ServerProgramInfoDTO serverProgram, String currentDate, Boolean isIncrementalLog) {
        Long taskInstanceId = taskInstance.getId();
        String taskInfo = taskInstance.getTaskInfo();
        String carteObjectId = getCarteObjectId(taskInfo);
        Long taskPlanId = taskPlanResourceDTO.getId();
        Long orgId = taskPlanResourceDTO.getOrgId();
        Long groupId = taskPlanResourceDTO.getGroupId();
        String taskName = taskPlanResourceDTO.getTaskName();
        TaskResourceEntity taskResourceEntity = taskPlanResourceDTO.getTaskResourceEntity();
        String authConfig = serverProgram.getAuthConfig();
        Integer failCode = TaskInstanceStatusEnum.MONITOR_EXCEPTION.getCode();
        // 异步执行
        CompletableFuture.runAsync(() -> {
            boolean isContinue = true;
            LongAdder reqErrCount = new LongAdder();
            Long logLineNumber = getInitLogLineNumber(isIncrementalLog, true);
            TaskMonitorThreadVO taskMonitorThreadVO;
            Document document;
            try {
                while (isContinue && reqErrCount.intValue() < 4) {
                    if (reqErrCount.intValue() > 0) {
                        TimeUnit.SECONDS.sleep(6);
                    }
                    document = getKettleDocument(taskResourceEntity, baseUrl, taskInfo, authConfig, logLineNumber);
                    if (Objects.isNull(document)) {
                        // 6. 请求异常
                        reqErrCount.increment();
                        continue;
                    }
                    taskMonitorThreadVO = xmlResolver(document, carteObjectId, taskInstanceId, taskPlanResourceDTO,
                            isIncrementalLog, true);
                    logLineNumber = taskMonitorThreadVO.getLastLogLineNumber();
                    isContinue = taskMonitorThreadVO.getIsContinue();


                    log.error("日志行数--》：{},是否继续操作：{}", logLineNumber, isContinue);
                }
            } catch (Exception e) {
                log.error("任务：{}监控异常!执行器记录ID：{} \r\n错误原因：{} \r\n", taskName, carteObjectId, e);
                if (!e.getMessage().contains("500")) {
                    isContinue = false;
                    updateTaskInstanceStatus(taskInstanceId, failCode, null);
                    String errorLog = String.format("%s - 任务：%s \r\n监控异常：%s \r\n", currentDate, taskName, e);
                    TaskInstanceLogUtil.writerLog(orgId, groupId, taskPlanId, taskInstanceId, errorLog);
                }
            }
            // 8. 监控异常（结束）
            if (reqErrCount.intValue() > 3 && isContinue) {
                String errorLog =
                        String.format("%s - 任务：%s多次出现监控出错！执行器记录ID：%s \r\n", currentDate, taskName, carteObjectId);
                log.error(errorLog);
                TaskInstanceLogUtil.writerLog(orgId, groupId, taskPlanId, taskInstanceId, errorLog);
                updateTaskInstanceStatus(taskInstanceId, failCode, null);
            }
        }, executor).thenRunAsync(() -> {
            // 任务执行完删除carte记录
            removeCarte(taskInstanceId, taskPlanResourceDTO, baseUrl, serverProgram.getAuthConfig());
        });

    }

    /**
     * XML解析器
     *
     * @param document            日志文档
     * @param carteObjectId       唯一编码
     * @param taskInstanceId      任务实例ID
     * @param taskPlanResourceDTO 任务编排
     * @param isIncrementalLog    是否增量日志
     * @param taskMonitor         是否监控
     * @return 是否继续、日志最后一行的行数
     * @throws Exception 任务异常
     */
    private TaskMonitorThreadVO xmlResolver(Document document, Object carteObjectId, Long taskInstanceId,
                                            TaskPlanResourceDTO taskPlanResourceDTO, Boolean isIncrementalLog, Boolean taskMonitor) throws Exception {

        Long taskPlanId = taskPlanResourceDTO.getId();
        Long orgId = taskPlanResourceDTO.getOrgId();
        Long groupId = taskPlanResourceDTO.getGroupId();
        String taskName = taskPlanResourceDTO.getTaskName();
        TaskResourceEntity taskResourceEntity = taskPlanResourceDTO.getTaskResourceEntity();
        Integer resourceType = taskResourceEntity.getResourceType();

        Node xmlNode = XMLHandler.getSubNode(document, getCheckStatusUrl(resourceType).toLowerCase());
        // 从xml中获取状态描述
        String statusText = parsingTaskStatusNode(xmlNode);

        log.error("获取日志拿到的状态是什么鬼：{}", statusText);

        Node loggingString = XMLHandler.getSubNode(xmlNode, KettleXmlConstant.LOGGING_STR);
        if (Objects.isNull(loggingString)) {
            log.warn("未查询到任务：{},执行器记录ID：{}的执行日志信息！", taskName, carteObjectId);
            Long initLogLineNumber = getInitLogLineNumber(isIncrementalLog, taskMonitor);
            if (getPartRunningStatusSet().contains(statusText) && taskMonitor) {
                // 让出监控资源
                Thread.yield();
                return buildTaskMonitorThreadVO(initLogLineNumber, true, statusText);
            }
            return buildTaskMonitorThreadVO(initLogLineNumber, false, statusText);
        }
        String textContent = loggingString.getTextContent();
        textContent = textContent.replace("<![CDATA[", "").replace("]]>", "");
        // 使用判断降低数据库读写频率 该步骤过于消耗数据库和硬件资源
        if (getParserTaskStepLogStatusSet().contains(statusText)) {
            // 解析详细步骤日志
            taskStepLogResolver(xmlNode, statusText, taskInstanceId);
        }
        // 解析全量日志
        taskLogResolver(textContent, orgId, groupId, taskPlanId, taskInstanceId);

        Long logLineNumber = taskLogLineNumberResolver(xmlNode, isIncrementalLog, taskMonitor);

        // false: 非运行中不再执行,true: 运行中继续执行
        boolean continueByStatus =
                getContinueByStatus(statusText, taskName, carteObjectId, taskInstanceId, taskMonitor);

        return buildTaskMonitorThreadVO(logLineNumber, continueByStatus, statusText);
    }

    /**
     * 自旋检查是否连接成功
     * 服务超时，但服务节点正在执行，所以此处检查是否连接成功
     *
     * @param e                   异常
     * @param baseUrl             基础url
     * @param taskInfo            任务实例配置信息
     * @param taskPlanResourceDTO 任务编排
     * @param serverProgram       服务节点信息
     * @throws Exception 任何异常
     */
    private String spinCheckConn(ResourceAccessException e, String baseUrl, String taskInfo,
                                 TaskPlanResourceDTO taskPlanResourceDTO, ServerProgramInfoDTO serverProgram, Long taskInstanceId)
            throws Exception {
        TaskResourceEntity taskResourceEntity = taskPlanResourceDTO.getTaskResourceEntity();
        String authConfig = serverProgram.getAuthConfig();
        String result;
        log.info("服务超时，但服务节点可能正在执行，此处检查是否连接成功：{},当前时间：{}", null, LocalDate.now());

        LongAdder spinCount = new LongAdder();
        // 延迟时间
        AtomicInteger delayAdder = new AtomicInteger(0);
        // 单个线程池
        ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        // 自旋检测
        for (; ; ) {
            if (spinCount.intValue() > 0) {
                // 每次出错等待一分钟，继续检测
                delayAdder.set(60);
            }
            ScheduledFuture<Integer> schedule = scheduledExecutor.schedule(new Callable<>() {
                @Override
                public Integer call() {
                    try {
                        executorFetchKettleXmlRequest(taskResourceEntity, baseUrl, taskInfo, authConfig, null);
                    } catch (Exception retryEx) {
                        spinCount.increment();
                    }
                    return spinCount.intValue();
                }
            }, delayAdder.get(), TimeUnit.SECONDS);
            Integer res = schedule.get();
            if (res >= 10) {
                updateTaskInstanceStatus(taskInstanceId, TaskInstanceStatusEnum.INIT_FAILED.getCode(), null);
                scheduledExecutor.shutdown();
                // 重试达到10次退出自旋检测
                throw e;

            }
            if (Objects.equals(res, spinCount.intValue())) {
                // 自旋判断成功
                result = "SEND SUCCESS";
                break;
            }
        }
        return result;
    }

    /**
     * 解析详细步骤日志
     *
     * @param xmlNode        xml节点信息
     * @param statusText     状态
     * @param taskInstanceId 任务实例ID
     */
    public void taskStepLogResolver(Node xmlNode, String statusText, Long taskInstanceId) {
        try {
            // 解析xml
            Node transNameNode = XMLHandler.getSubNode(xmlNode, KettleXmlConstant.TRANS_NAME);
            Node logDateNode = XMLHandler.getSubNode(xmlNode, KettleXmlConstant.LOG_DATE);
            String transName = XMLHandler.getNodeValue(transNameNode);
            String logDate = XMLHandler.getNodeValue(logDateNode);
            Node stepStatusList = XMLHandler.getSubNode(xmlNode, KettleXmlConstant.STEP_STATUS_LIST_NODE);
            List<Node> stepStatus = XMLHandler.getNodes(stepStatusList, KettleXmlConstant.STEP_STATUS);

            List<TaskStepLogVO> taskStepLogs = new ArrayList<>();
            Map<String, Object> map;
            // 遍历所有的Step
            for (Node node : stepStatus) {
                // 解析步骤节点
                map = parsingTaskStepNode(node);
                gleanTaskStepLog(map, transName, logDate, taskInstanceId, taskStepLogs);
            }
            // 如果任务已经没有再运行 则入库
            if (!Objects.equals(statusText, TransJobStatusEnum.RUNNING.getName())) {
                saveTaskStepLog(taskStepLogs);
            }
        } catch (Exception e) {
            log.error("---------------解析步骤日志：转换 xml 对象异常！！！！！", e);
        }
    }

    /**
     * 解析状态描述
     *
     * @param xmlNode 根节点
     * @return 状态描述
     */
    private String parsingTaskStatusNode(Node xmlNode) {
        Node statusDesc = XMLHandler.getSubNode(xmlNode, KettleXmlConstant.STATUS_DESC);
        if (Objects.isNull(statusDesc)) {
            return TransJobStatusEnum.EXEC_ERROR.getName();
        }
        return statusDesc.getTextContent();
    }

    /**
     * 解析日志最后一行节点
     *
     * @param xmlNode          根节点
     * @param taskMonitor      是否开启监控
     * @param isIncrementalLog 是否增量日志
     * @return 当前请求日志最后一行
     */
    private Long taskLogLineNumberResolver(Node xmlNode, Boolean isIncrementalLog, Boolean taskMonitor) {
        if (!taskMonitor || !isIncrementalLog) {
            return null;
        }
        Node lastLogLineNrNode = XMLHandler.getSubNode(xmlNode, KettleXmlConstant.LAST_LOG_LINE_NR);
        if (Objects.isNull(lastLogLineNrNode)) {
            return null;
        }
        String textContent = lastLogLineNrNode.getTextContent();
        if (StringUtils.isBlank(textContent)) {
            return null;
        }
        return Long.valueOf(textContent);
    }

    /**
     * 解析步骤日志：解析步骤节点
     *
     * @param stepNode 步骤节点
     * @return 每一步的数据
     * @throws ParseException ParseException
     */
    private Map<String, Object> parsingTaskStepNode(Node stepNode) throws ParseException {
        Map<String, Object> map = new HashMap<>();
        String[] nodeElements = XMLHandler.getNodeElements(stepNode);
        if (Objects.isNull(nodeElements) || nodeElements.length == 0) {
            return map;
        }
        for (String nodeName : nodeElements) {
            Node subNode = XMLHandler.getSubNode(stepNode, nodeName);
            if (Objects.isNull(subNode)) {
                continue;
            }
            String nodeValue = subNode.getTextContent();
            try {
                if (Objects.equals(nodeName, KettleXmlConstant.STEP_NAME)) {
                    map.put("stepName", nodeValue);
                    continue;
                }
                if (nodeValue.trim().equals(KettleXmlConstant.PRIORITY_VAL)) {
                    map.put(nodeName, null);
                    continue;
                }
                if (Objects.equals(nodeName, KettleXmlConstant.SPEED) || nodeValue.indexOf(",") > 0) {
                    DecimalFormat df = new DecimalFormat(",###");
                    Number num = df.parse(nodeValue.trim());
                    map.put(nodeName, num);
                    continue;
                }
                if (Objects.equals(nodeName, KettleXmlConstant.SECONDS)) {
                    double parseDouble = Double.parseDouble(nodeValue.trim());
                    map.put(nodeName, parseDouble);
                    continue;
                }
                if (Objects.equals(nodeName, KettleXmlConstant.COPY)
                        || Objects.equals(nodeName, KettleXmlConstant.LINES_READ)
                        || Objects.equals(nodeName, KettleXmlConstant.LINES_WRITTEN)
                        || Objects.equals(nodeName, KettleXmlConstant.LINES_INPUT)) {
                    map.put(nodeName, StringUtils.isBlank(nodeValue) ? 0L : Long.parseLong(nodeValue.trim()));
                    continue;
                }
                if (Objects.equals(nodeName, KettleXmlConstant.LINES_OUTPUT)
                        || Objects.equals(nodeName, KettleXmlConstant.LINES_UPDATED)
                        || Objects.equals(nodeName, KettleXmlConstant.LINES_REJECTED)
                        || Objects.equals(nodeName, KettleXmlConstant.ERRORS)) {
                    map.put(nodeName, StringUtils.isBlank(nodeValue) ? 0L : Long.parseLong(nodeValue.trim()));
                    continue;
                }
                map.put(nodeName, nodeValue);
            } catch (NumberFormatException e) {
                map.put(nodeName, nodeValue);
            }
        }
        return map;
    }

    /**
     * 保存全量日志
     *
     * @param logText        base64日志内容
     * @param orgId          组织ID
     * @param taskPlanId     任务编排ID
     * @param taskInstanceId 任务实例ID
     * @throws IOException IO信息
     */
    public void taskLogResolver(String logText, Long orgId, Long groupId, Long taskPlanId, Long taskInstanceId)
            throws IOException {
        // 日志内容
        String logStr = HttpUtil.decodeBase64ZippedString(logText);
        TaskInstanceLogUtil.writerLog(orgId, groupId, taskPlanId, taskInstanceId, logStr);
    }

    /**
     * 获取初始日志行数
     *
     * @param isIncrementalLog 是否是增量日志
     * @param taskMonitor      是否开启线程监控
     * @return 日志行数
     */
    private Long getInitLogLineNumber(Boolean isIncrementalLog, Boolean taskMonitor) {
        if (isIncrementalLog && taskMonitor) {
            return 0L;
        } else {
            return null;
        }
    }

    /**
     * 构建任务监控VO
     *
     * @param logLineNumber    日志最后一行的行数
     * @param continueByStatus 是否继续执行
     * @param statusText       执行器状态
     * @return 任务监控VO
     */
    private TaskMonitorThreadVO buildTaskMonitorThreadVO(Long logLineNumber, Boolean continueByStatus, String statusText) {
        return TaskMonitorThreadVO.builder().lastLogLineNumber(logLineNumber).isContinue(continueByStatus).carteStatusText(statusText).build();
    }

    /**
     * 收集任务步骤日志
     *
     * @param map            map信息
     * @param transName      转换名称
     * @param logDate        日志时间
     * @param taskInstanceId 任务实例ID
     * @param taskStepLogs   任务步骤日志集合
     */
    private void gleanTaskStepLog(Map<String, Object> map, String transName, String logDate, Long taskInstanceId,
                                  List<TaskStepLogVO> taskStepLogs) {
        TaskStepLogVO taskStepLogVO = new TaskStepLogVO();
        BeanMap beanMap = BeanMap.create(taskStepLogVO);
        beanMap.putAll(map);
        taskStepLogVO.setTaskInstanceId(taskInstanceId);
        taskStepLogVO.setTransName(transName);
        taskStepLogVO.setLogDate(logDate);
        String priority = taskStepLogVO.getPriority();
        if (StringUtils.isNotBlank(priority) && !Objects.equals(priority, KettleXmlConstant.PRIORITY_VAL)) {
            String[] split = priority.split(CONN_SLASH);
            taskStepLogVO.setInputBufferRows(Long.parseLong(split[0].trim()));
            taskStepLogVO.setOutputBufferRows(Long.parseLong(split[1].trim()));
        }
        taskStepLogs.add(taskStepLogVO);
    }

    /**
     * 转换任务步骤日志实例
     *
     * @param taskStepLogs 源数据
     * @return 任务步骤日志集合
     */
    private List<TaskStepLog> transTaskStepLog(List<TaskStepLogVO> taskStepLogs) {
        List<TaskStepLog> taskStepLogList = new ArrayList<>();
        TaskStepLog taskStepLog;
        for (TaskStepLogVO taskStepLogVO : taskStepLogs) {
            taskStepLog = new TaskStepLog();
            taskStepLog.setStepName(taskStepLogVO.getStepName());
            taskStepLog.setLogDate(taskStepLogVO.getLogDate());
            taskStepLog.setTransName(taskStepLogVO.getTransName());
            taskStepLog.setTaskInstanceId(taskStepLogVO.getTaskInstanceId());
            taskStepLog.setStepCopy(taskStepLogVO.getCopy());
            taskStepLog.setErrors(taskStepLogVO.getErrors());

            taskStepLog.setLinesInput(taskStepLogVO.getLinesInput());
            taskStepLog.setLinesOutput(taskStepLogVO.getLinesOutput());
            taskStepLog.setLinesRead(taskStepLogVO.getLinesRead());
            taskStepLog.setLinesRejected(taskStepLogVO.getLinesRejected());
            taskStepLog.setLinesUpdated(taskStepLogVO.getLinesUpdated());
            taskStepLog.setLinesWritten(taskStepLogVO.getLinesWritten());

            taskStepLog.setInputBufferRows(taskStepLogVO.getInputBufferRows());
            taskStepLog.setOutputBufferRows(taskStepLogVO.getOutputBufferRows());

            taskStepLog.setRunningTime(taskStepLogVO.getSeconds());
            taskStepLog.setSpeed(taskStepLogVO.getSpeed());
            taskStepLog.setStatusDescription(taskStepLogVO.getStatusDescription());
            taskStepLogList.add(taskStepLog);
        }
        return taskStepLogList;
    }

    /**
     * 保存任务步骤日志
     *
     * @param taskStepLogs 任务步骤日志实体参数
     */
    private void saveTaskStepLog(List<TaskStepLogVO> taskStepLogs) {
        taskStepLogService.saveBatch(transTaskStepLog(taskStepLogs));
    }

    /**
     * 超时流程
     *
     * @param e                   异常
     * @param currentDate         当前时间
     * @param baseUrl             基础url
     * @param execUrlBackup       备份url
     * @param taskPlanResourceDTO 任务编排参数
     * @param taskInstanceId      任务实例ID
     */
    private void timeOutHandlerProcess(Exception e, String currentDate, String baseUrl, String execUrlBackup,
                                       TaskPlanResourceDTO taskPlanResourceDTO, Long taskInstanceId) {
        Long orgId = taskPlanResourceDTO.getOrgId();
        Long groupId = taskPlanResourceDTO.getGroupId();
        Long taskPlanId = taskPlanResourceDTO.getId();

        try {
            StringWriter sw = new StringWriter();
            PrintWriter pw = new PrintWriter(sw);
            e.printStackTrace(pw);
            String logStr = sw.toString();
            if (e.getCause() instanceof ConnectException) {
                StringBuilder msgBuilder = new StringBuilder("任务调用请求超时！请检查服务节点状态或者网络安全策略,确认服务节点能正常通讯。");
                msgBuilder.append("确认方式：在服务节点所在服务器上使用telnet、wget等命令工具或者浏览器访问当前调用服务节点地址:").append(baseUrl).append("  \r\n 错误详情：")
                        .append(sw);
                logStr = msgBuilder.toString();
            }
            // 保存日志
            logStr = String.format("%s - %s", currentDate, logStr);
            TaskInstanceLogUtil.writerLog(orgId, groupId, taskPlanId, taskInstanceId, logStr);
        } finally {
            updateTaskInstanceStatus(taskInstanceId, TaskInstanceStatusEnum.INIT_FAILED.getCode(), execUrlBackup);
        }
    }

    /**
     * 根据状态是否继续执行
     * <p>
     * false: 非运行中不再执行,true: 运行中继续执行
     * </P>
     *
     * @param statusText     状态
     * @param taskName       任务编排名称
     * @param carteObjectId  carte唯一标识
     * @param taskInstanceId 任务实例ID
     * @param taskMonitor    是否监控
     * @return true/false
     */
    private boolean getContinueByStatus(String statusText, String taskName, Object carteObjectId, Long taskInstanceId,
                                        Boolean taskMonitor) {
        String running = TransJobStatusEnum.RUNNING.getName();
        Integer taskInstanceStatusCode = null;
        if (taskMonitor) {
            taskInstanceStatusCode = CommonUtil.getTaskInstanceCodeByStatusValue(statusText);
        }
        if (Objects.isNull(taskInstanceStatusCode)) {
            TaskInstance sys = taskInstanceService.getByIdIgnoreTenantId(taskInstanceId);
            taskInstanceStatusCode = sys.getStatus();
        }

        // 监控状态： 等待中、执行中、准备执行、初始化中
        if (getRunningStatusSet().contains(statusText)) {
            log.info("任务：{},执行器记录ID：【{}】" + " 执行中，当前状态:{}", taskName, carteObjectId, statusText);
            if (Objects.equals(TransJobStatusEnum.PREPARING.getName(), statusText)) {
                taskInstanceStatusCode = TaskInstanceStatusEnum.IN_SERVICE.getCode();
            }

            updateTaskInstanceStatus(taskInstanceId, taskInstanceStatusCode, null);
            if (!Objects.equals(statusText, running) && taskMonitor) {
                // 让出监控资源
                Thread.yield();
            }
            return true;
        } else {
            // 监控状态：非运行中（完成、停止、暂停、失败）
            return noneRunning(statusText, taskName, carteObjectId, taskInstanceId);
        }
    }

    /**
     * 非运行中
     *
     * @param statusText     状态
     * @param taskName       任务编排名称
     * @param carteObjectId  cart执行唯一标识
     * @param taskInstanceId 任务实例ID
     * @return false
     */
    private boolean noneRunning(String statusText, String taskName, Object carteObjectId, Long taskInstanceId) {

        String finished = TransJobStatusEnum.FINISHED.getName();
        String stopped = TransJobStatusEnum.STOPPED.getName();
        String paused = TransJobStatusEnum.PAUSED.getName();
        if (Objects.equals(statusText, finished)) {
            log.info("任务：{},执行器记录ID：【{}】 任务执行完成...", taskName, carteObjectId);
            updateTaskInstanceStatus(taskInstanceId, TaskInstanceStatusEnum.SUCCESS.getCode(), null);
        } else if (Objects.equals(statusText, stopped)) {
            log.info("任务：{},执行器记录ID：【{}】 任务执行已停止...", taskName, carteObjectId);
            updateTaskInstanceStatus(taskInstanceId, TaskInstanceStatusEnum.FAIL.getCode(), null);
        } else if (Objects.equals(statusText, paused)) {
            log.info("任务：{},执行器记录ID：【{}】 任务执行被暂停...", taskName, carteObjectId);
            updateTaskInstanceStatus(taskInstanceId, TaskInstanceStatusEnum.FAIL.getCode(), null);
        } else {
            // TODO halting 最好排查转换、避免这个状态，同时调用--停止这个任务进程。
            log.error("任务：{},执行器记录ID：【{}】 任务出现错误，状态码：{}", taskName, carteObjectId, statusText);
            updateTaskInstanceStatus(taskInstanceId, TaskInstanceStatusEnum.FAIL.getCode(), null);
        }
        return false;

    }

    /**
     * 完成后移除carte记录信息
     *
     * @param taskInstanceId      任务实例ID
     * @param taskPlanResourceDTO taskPlanResourceDTO
     * @param baseUrl             baseUrl
     * @param authConfig          authConfig
     */
    private void removeCarte(Long taskInstanceId, TaskPlanResourceDTO taskPlanResourceDTO, String baseUrl, String authConfig) {

        TaskResourceEntity taskResourceEntity = taskPlanResourceDTO.getTaskResourceEntity();
        TaskInstance taskInstance = taskInstanceService.getByIdIgnoreTenantId(taskInstanceId);
        String carteObjectId = getCarteObjectId(taskInstance.getTaskInfo());
        String statusText = TaskInstanceStatusEnum.matchValue(taskInstance.getStatus());

        if (getPartRemoveStatusSet().contains(statusText)) {
            List<String> requestParams = getRequestParam(taskResourceEntity.getResourceType());
            String entrance = getEntrance(taskResourceEntity);
            String removeUri = requestParams.get(2);
            String entranceFullName = getEntranceFullName(entrance);
            String entranceName = getEntranceName(entranceFullName);
            StringBuilder queryUrl = new StringBuilder(baseUrl);
            queryUrl.append(removeUri).append(KettleConstant.QUERY_URL_PARAM_NAME).append(entranceName);

            if (Objects.nonNull(carteObjectId)) {
                queryUrl.append(KettleConstant.QUERY_URL_PARAM_CARTE_OBJECT_ID).append(carteObjectId);
            }
            queryUrl.append(KettleConstant.QUERY_URL_PARAM_XML);
            Map<String, String> map = strToMap(authConfig);
            String userName = map.get(ServerNodeAuthConfigConstant.USERNAME);
            String password = map.get(ServerNodeAuthConfigConstant.PASSWORD);
            String result = HttpPostUtils.getInstance().post(queryUrl.toString(), getAuthorization(userName, password));
            log.info("对象ID：{},执行记录删除结果：{}", carteObjectId, result);
        }

    }

    /**
     * 获取初始化中、等待中、准备执行状态
     *
     * @return 初始化中、等待中、准备执行状态集合
     */
    private Set<String> getPartRunningStatusSet() {
        Set<String> set = new HashSet<>();
        String waiting = TransJobStatusEnum.INIT_JOB.getName();
        String initializing = TransJobStatusEnum.INIT_TRANS.getName();
        String pe = TransJobStatusEnum.PREPARING.getName();
        set.add(waiting);
        set.add(initializing);
        set.add(pe);
        return set;
    }

    /**
     * 获取解析任务步骤日志的状态set集合
     * <p>
     * 运行中、完成、停止、运行失败
     * </p>
     *
     * @return 解析任务步骤日志的状态set集合
     */
    private Set<String> getParserTaskStepLogStatusSet() {
        Set<String> set = new HashSet<>();
        String running = TransJobStatusEnum.RUNNING.getName();
        String finished = TransJobStatusEnum.FINISHED.getName();
        String stopped = TransJobStatusEnum.STOPPED.getName();
        String fwe = TransJobStatusEnum.FINISHED_WITH_ERRORS.getName();
        set.add(running);
        set.add(finished);
        set.add(stopped);
        set.add(fwe);
        return set;
    }

    /**
     * 获取运行中的状态set集合
     * <p>
     * 等待中、初始化中、运行中、执行中
     * </p>
     *
     * @return 运行中的状态set集合
     */
    private Set<String> getRunningStatusSet() {
        Set<String> set = new HashSet<>();
        String waiting = TransJobStatusEnum.INIT_JOB.getName();
        String initializing = TransJobStatusEnum.INIT_TRANS.getName();
        String running = TransJobStatusEnum.RUNNING.getName();
        String pe = TransJobStatusEnum.PREPARING.getName();
        set.add(running);
        set.add(waiting);
        set.add(initializing);
        set.add(pe);
        return set;
    }

    /**
     * 获取错误、完成、完成错误、停止执行状态
     *
     * @return 错误、完成、完成错误、停止执行状态集合
     */
    private Set<String> getPartRemoveStatusSet() {
        Set<String> set = new HashSet<>();
        String error = TransJobStatusEnum.EXEC_ERROR.getName();
        String finished = TransJobStatusEnum.FINISHED.getName();
        String finishedWithErrors = TransJobStatusEnum.FINISHED_WITH_ERRORS.getName();
        // TODO 手动停止任务时， 需要在手动停止的地方执行删除carte记录
        set.add(error);
        set.add(finished);
        set.add(finishedWithErrors);
        return set;
    }

    /**
     * 生成认证authorization
     *
     * @param userName 账号
     * @param password 密码
     * @return 认证authorization
     */
    private String getAuthorization(String userName, String password) {
        return BASIC + Base64.encode((userName + ":" + password).getBytes(StandardCharsets.UTF_8));
    }

    /**
     * 查询图片配置header
     *
     * @param resourceContentMap      资源库配置
     * @param authConfig              认证配置
     * @param taskInstanceResourceDTO 任务实例资源
     * @return headers
     */
    private Map<String, Object> getImgReader(Map<String, Object> resourceContentMap, String authConfig, TaskInstanceResourceDTO taskInstanceResourceDTO) {

        Map<String, Object> headers = new HashMap<>();
        // 组装资源库信息参数
        Object repoNameObj = resourceContentMap.get(ResourceContentConstant.KETTLE_REPO_NAME);
        if (Objects.nonNull(repoNameObj)) {
            // kettle连接资源库的名称（不能乱写）
            headers.put(EtlRequestConstant.BODY_REPO_NAME, repoNameObj);
        }
        Object repoAccountObj = resourceContentMap.get(ResourceContentConstant.KETTLE_REPO_ACCOUNT);
        if (Objects.nonNull(repoAccountObj)) {
            headers.put(EtlRequestConstant.BODY_REPO_USER, repoAccountObj);
        }
        Object repoPasswordObj = resourceContentMap.get(ResourceContentConstant.KETTLE_REPO_PASSWORD);
        if (Objects.nonNull(repoPasswordObj)) {
            headers.put(EtlRequestConstant.BODY_REPO_PASS, repoPasswordObj);
        }

        Map<String, String> map = strToMap(authConfig);
        String userName = map.get(ServerNodeAuthConfigConstant.USERNAME);
        String password = map.get(ServerNodeAuthConfigConstant.PASSWORD);
        headers.put(EtlRequestConstant.HEADER_AUTH, getAuthorization(userName, password));


        Map<String, String> repConfigJsonMap = new HashMap<>(12);

        // 资源库名称
        repConfigJsonMap.put(EtlRequestConstant.HEADER_REPO_NAME,
                Objects.isNull(resourceContentMap.get(ResourceContentConstant.KETTLE_REPO_NAME)) ? ""
                        : resourceContentMap.get(ResourceContentConstant.KETTLE_REPO_NAME).toString());
        // 资源库描述
        repConfigJsonMap.put(EtlRequestConstant.HEADER_REPO_DESC,
                Objects.isNull(resourceContentMap.get(ResourceContentConstant.KETTLE_REPO_DESC)) ? ""
                        : resourceContentMap.get(ResourceContentConstant.KETTLE_REPO_DESC).toString());

        TaskResourceConfigEntity resourceConfigEntity = taskInstanceResourceDTO.getTaskResourceConfigEntity();
        // 配置数据库连接信息
        getRepHeadersBuildDb(repConfigJsonMap, resourceConfigEntity);

        headers.put(EtlRequestConstant.HEADER_REPO_CONFIG_JSON, URLUtil.encode(JSONUtil.toJsonStr(repConfigJsonMap)));

        return headers;
    }

    /**
     * json格式的字符串转换为Map
     *
     * @param content
     *        字符串内容
     * @return map
     */
    private Map<String,String> strToMap(String content){
        return JSONUtil.toBean(JSONUtil.parse(content), new TypeReference<>(){},true);
    }
}
